Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Add InternalData read and write builders #12060

Merged
merged 12 commits into from
Feb 13, 2025

Conversation

rdblue
Copy link
Contributor

@rdblue rdblue commented Jan 23, 2025

This adds InternalData with read and write builder interfaces that can be used with Avro and Parquet by passing a FileFormat. Formats are registered by calling InternalData.register with callbacks to create format-specific builders.

The class is InternalData because registered builders are expected to use the internal object model that is used for Iceberg metadata files. Using a specific object model avoids needing to register callbacks to create value readers and writers that produce the format needed by the caller.

To demonstrate the new interfaces, this PR implements them using both Avro and Parquet. Parquet can't be used because it would fail at runtime until #11904 is committed (it is also missing custom type support).

Avro is working. To demonstrate that the builders can be used for metadata, this updates ManifestWriter, ManifestReader, and ManifestListWriter to use InternalData builders. It was also necessary to migrate the metadata classes to extend StructLike for the internal writers instead of IndexedRecord.

DynMethods.StaticMethod registerParquet =
DynMethods.builder("register")
.impl("org.apache.iceberg.parquet.Parquet")
.buildStaticChecked();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This uses DynMethods to call Parquet's register method directly, rather than using a ServiceLoader. There is no need for the complexity because we want to keep the number of supported formats small rather than plugging in custom formats.

I'm also considering refactoring so that the register method here is package-private so that no one can easily call it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand this one, why can we call Avro Register() but not Parquet.register(). I'm also not clear on the Service Loader comment, is that just to note we don't want to make this dynamic and only want hardcoded formats to be supported?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is due to the gradle project level isolation. Avro is currently included in core, but Parquet is in a separate subproject. I'm in favor of being explicit about what is supported (i.e. hard-coded), but we would like to keep parquet in a separate project to reduce dependency proliferation from api/core.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using the Java ServiceLoader to load the internal readers and writers?

I have created a WIP for testing out how the DataFile readers could work: #12069

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fist I implemented using the registry method like on this PR (7e171cc), then moved to the ServiceLoader method.

In my head the 2 problems are very similar

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ServiceLoader framework is error prone and commonly broken because of Jar bundling. In addition, we do not want anything else registered so it is not needed. That would make it easier to plug in here, which we specifically are trying to avoid.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand this one, why can we call Avro Register() but not Parquet.register(). I'm also not clear on the Service Loader comment, is that just to note we don't want to make this dynamic and only want hardcoded formats to be supported?

Agree with @RussellSpitzer here. Maybe better to add a comment about it.

public org.apache.avro.Schema getSchema() {
return AVRO_SCHEMA;
public int size() {
return MANIFEST_LIST_SCHEMA.columns().size();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avro schemas are no longer needed when using StructLike rather than IndexedRecord.

private DataFile wrapped = null;

IndexedDataFile(org.apache.avro.Schema avroSchema) {
this.avroSchema = avroSchema;
this.partitionWrapper = new IndexedStructLike(avroSchema.getField("partition").schema());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also no need for a wrapper to adapt PartitionData to IndexedRecord because it is already StructLike.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Big fan of this change

@@ -90,14 +103,18 @@ private enum Codec {
}

public static WriteBuilder write(OutputFile file) {
if (file instanceof EncryptedOutputFile) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Encryption is handled by adding this. The read side already has a similar check.

@@ -76,6 +76,15 @@ public void setSchema(Schema schema) {
initReader();
}

@Override
public void setCustomTypes(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the InternalReader is no longer created by ManifestReader, the custom types now need to be passed to the read builder and forwarded to the reader here. Custom type support will need to be implemented for Parquet as well.

@@ -20,7 +20,7 @@

import java.util.Map;

/** An interface for Avro DatumReaders to support custom record classes. */
/** An interface for Avro DatumReaders to support custom record classes by name. */
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used to distinguish between Iceberg custom types that used field IDs and StructLike and Avro's, which worked by renaming Avro records to class names that would be dynamically loaded.

}

private static WriteBuilder writeInternal(OutputFile outputFile) {
return write(outputFile);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be where the internal object model is injected for Parquet.

@@ -1171,6 +1188,16 @@ public ReadBuilder withNameMapping(NameMapping newNameMapping) {
return this;
}

@Override
public ReadBuilder setRootType(Class<? extends StructLike> rootClass) {
throw new UnsupportedOperationException("Custom types are not yet supported");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the internal object model is complete, this should be implemented to instantiate the expected metadata classes while reading.

registerParquet.invoke();

} catch (NoSuchMethodException e) {
// failing to load Parquet is normal and does not require a stack trace
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if normal for now? Don't we expect this to be failing bug in the future? I'm also a little interested in when we would actually fail here if we are using the Iceberg repo as is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't say that it's normal to fail. I'm actually not aware of any situations where the api/core modules are used but parquet isn't included. I think in almost all scenarios, it'll be available.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be normal whenever the iceberg-parquet module isn't in the classpath. For instance, the manifest read and write tests that are currently using InternalData in this PR hit this but operate normally because Parquet isn't used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still feels a bit odd. If it only can plausibly occur during tests I'm not sure we should even log this? We would still get errors at runtime if you attempt to get a reader or writer if Parquet isn't loaded.

return writeBuilder.apply(file);
}

throw new UnsupportedOperationException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Personally I think it may be a bit clearer to extract the handling the missing writer/reader

maybe have

writerFor(File format) {
  writer = WRITE_BUILDERS.get(format)
  if (writer == null) {
    throw new Unsupported Exception
  } else {
    return writer;
  }
}

So that this code is just

return writerFor(format).apply(file)

Mostly I feel a little unease about the implicit else in the current logic so having an else would also make me feel a little better

WriteBuilder meta(String property, String value);

/**
* Set a file metadata properties from a Map.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Set a file metadata properties from a Map.
* Set file metadata properties from a Map.

/**
* Set a writer configuration property.
*
* <p>Write configuration affects writer behavior. To add file metadata properties, use {@link
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* <p>Write configuration affects writer behavior. To add file metadata properties, use {@link
* <p>Write configuration affects this writer's behavior. To add metadata properties to the written file use {@link

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"This" doesn't refer to a writer. This is configuring a builder that creates the writer, so I think that the existing language is correct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I just find these two apis a little confusing. One is properties for the writer we are building, and one is for metadata for files created by that writer. It's probably clear enough though since I think I have it straight

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is alignment with the existing writer APIs. We have this for quite some time already, so I think we should stick to it unless we have strong reasons to change. The change in the javadoc helps too

ReadBuilder reuseContainers();

/** Set a custom class for in-memory objects at the schema root. */
ReadBuilder setRootType(Class<? extends StructLike> rootClass);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll probably get to this later in the PR but i'm interested in why we need this and setCustomType

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see how it's used below, I'm wondering if instead of needing this, could we just automatically set these readers based on the root type? Ie

setRootType(ManifestEntry) --- Automatically sets field types based on Manifest entry?

Or do we have a plan for using this in a more custom manner in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem this is solving is that we don't have an assigned ID for the root type. We could use a sentinel value like -1, but that could technically collide. I just don't want to rely on setCustomType(ROOT_FIELD_ID, SomeObject.class).

@RussellSpitzer
Copy link
Member

From a release prospective, should we merge this post 1.8? Just thinking we probably want it in the build for a bit before we ship it? I know that partition stats is downstream of this so that is a dependency to consider but i'm not sure we can get that all together rapidly if we want to do this in the next week or so.

@rdblue
Copy link
Contributor Author

rdblue commented Jan 23, 2025

From a release prospective, should we merge this post 1.8? Just thinking we probably want it in the build for a bit before we ship it? I know that partition stats is downstream of this so that is a dependency to consider but i'm not sure we can get that all together rapidly if we want to do this in the next week or so.

I agree. There's no need to target this for 1.8, especially when it isn't clear that the Parquet internal object model will make it. I just wanted to get this out for discussion since we are currently blocked on creating Parquet metadata files until we either merge Parquet into core or implement something like this.

@rdblue rdblue force-pushed the internal-add-internal-data-builders branch from ef05f9c to 01a9848 Compare January 27, 2025 22:39
@rdblue rdblue force-pushed the internal-add-internal-data-builders branch from 01a9848 to acb5778 Compare January 27, 2025 22:55
@rdblue rdblue force-pushed the internal-add-internal-data-builders branch from c651737 to 952e89b Compare February 11, 2025 21:47
* @param value config value
* @return this for method chaining
*/
WriteBuilder set(String property, String value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional that we have meta(Map), but we don't have set(Map)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meta(Map) is used in the code paths that call this. We can also plumb through setAll but I was going for a minimal set of methods here.

this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry");
this.fileWrapper = new IndexedDataFile(avroSchema.getField("data_file").schema());
ManifestEntryWrapper() {
this.size = entrySchema(Types.StructType.of()).columns().size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We have an EmptyStructLike we could use here. Although maybe we should just have "of()" always return a singleton.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking more maybe this should just be a constant? Feels like we are doing a lot of work to get the number of elements in the datafile schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a Type, so I don't think EmptyStructLike would work. We can definitely have a special case for an empty schema though.

IndexedManifestEntry(Long commitSnapshotId, Types.StructType partitionType) {
this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry");
ManifestEntryWrapper(Long commitSnapshotId) {
this.size = entrySchema(Types.StructType.of()).columns().size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above, just seems like we are doing a lot of work to get a constant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know what you mean, but I didn't want this to get out of sync with entrySchema. Still calling that method seemed like the safest path.

@rdblue
Copy link
Contributor Author

rdblue commented Feb 13, 2025

Merging this now that the 1.8.0 vote has passed. Thanks for reviewing, everyone!

@rdblue rdblue merged commit b8fdd84 into apache:main Feb 13, 2025
46 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants